Skip to content

Conversation

@RedKinda
Copy link

Motivation

Currently IO operations for fs::File is implemented using spawn_blocking to perform IO, which is not great for an async runtime. This PR implements IO operations using io_uring. The fs::write function already uses io_uring for async operations, and this PR extends the uring functionality into impl AsyncWrite for fs::File.

For full discussion see here #7684 and subsequently on discord https://discord.com/channels/500028886025895936/810724255046172692/1427735341003051079

Solution

The fs::File struct already implements an internal buffer when writing using spawn_blocking, which is similar to what io_uring requires. Therefore in poll_write, we can check if io_uring is initialized and supported, and if not fall back to the spawn_blocking implementation, without requiring any API changes.

This PR only implements AsyncWrite, and once this PR is accepted, I am happy to also open a PR for AsyncRead for File using the same structure.

@ADD-SP ADD-SP added C-enhancement Category: A PR with an enhancement or bugfix. A-tokio Area: The main tokio crate M-fs Module: tokio/fs labels Oct 27, 2025
}

cfg_io_uring! {
struct BoxedOp<T>(Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the Future need to be Sync ?
I think it is not really necessary.

Copy link
Author

@RedKinda RedKinda Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle you could avoid Sync using src/util/sync_wrapper.rs. But for this use-case, it does not matter.

let handle = BoxedOp(Box::pin(async move {
let (r, buf, _fd) = op.await;
match r {
Ok(_n) => (Operation::Write(Ok(())), buf),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a chance that the buffer is not fully drained here.
Write(Ok()) should be returned only if buf.is_empty().

I think you need to wrap the write in a loop and handle it with something like:

Ok(0) => break (Operation::Write(Err(io::ErrorKind::WriteZero.into())), buf),
Ok(_) if buf.is_empty() => break (Operation::Write(Ok(())), buf),
Ok(_) => continue, // more to write
Err(e) => break (Operation::Write(Err(e)), buf),

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks and fixed

let handle = BoxedOp(Box::pin(async move {
let (r, buf, _fd) = op.await;
match r {
Ok(_n) => (Operation::Write(Ok(())), buf),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above - make sure the buf is empty before saying Ok().

@RedKinda
Copy link
Author

This should be ready for merging i think, is there anything else blocking it?

@Darksonn
Copy link
Contributor

Sorry I've been too focused on #7696. I will try to review this PR too.

Comment on lines 826 to 831
// Handle not present in some tests?
if let Ok(handle) = Handle::try_current() {
if handle.inner.driver().io().check_and_init()? {
task_join_handle = {
use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is way too many levels of indentation. We need to pull some logic into a separate function.

Comment on lines 113 to 123
#[derive(Debug)]
enum JoinHandleInner<T> {
Blocking(JoinHandle<T>),
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
Async(BoxedOp<T>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an idea:

What if you perform the io-uring logic in a tokio::spawn task? That way, you can use JoinHandle in both cases.

I think it will simply code a lot.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Blocking(JoinHandle) is crate::blocking::JoinHandle not tokio::runtime::task::join::JoinHandle so some enum would be required anyway, and at that point might as well just have this kind of a boxed future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a re-export of the same thing.

pub(crate) use crate::task::JoinHandle;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason that I suggest this is that the io-uring logic performs multiple writes in a loop to write the entire buffer, which requires the end-user to await flush or write on the file to make progress. However, the implementation we have today does not require the user to interact with the file to make progress - it happens in the background automatically.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it, I'll change it.

Out of curiosity, general rule is that for a future to keep making progress, user needs to keep polling it. I feel like not having to do that specifically for fs::File is just an implementation detail, and users shouldn't rely on that always being the case. So i guess my question is how much is it worth preserving old behavior like this when implementing new features, if it comes at the cost of (small) performance overhead? relevant XKCD

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to adjust mocks to accommodate non-blocking spawning https://github.com/tokio-rs/tokio/actions/runs/19464918802/job/55697554687 b91836c - can you doublecheck im doing it right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of thing is always a hard question. Though I would say that I'm pretty sure people rely on writes continuing even if the file is dropped, and it would be very hard to change that.

}

impl Op<Write> {
/// Issue a write that starts at `buf_offset` within `buf` and writes some bytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docstring seems to be outdated. buf_offset is no more a parameter

let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init()? {
return write_uring(path, contents).await;
use crate::io::blocking;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this import needed here ?
It is already imported at line 8

buf,
);
}
Ok(_) if buf.is_empty() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this arm should be moved to be first, i.e. before the Ok(0) one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-fs Module: tokio/fs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants